"""
This module contains some wrappers for using PSI4 to perform calculations.
https://psicode.org/
"""
from BigDFT.Calculators import Runner
from futile.Utils import write as safe_print
[docs]def bigdft_to_psi4(sysA, sysB=None, chargeA=0, multiplicityA=None, chargeB=0,
multiplicityB=None):
"""
Create a PSI4 dimer molecule which can be used for SAPT.
If multiple molecules are specified, we create a Dimer system.
Args:
sysA (BigDFT.Systems.System): the first molecule.
sysB (BigDFT.Systems.System): the second molecule (optional).
chargeA (int): the charge of the first molecule.
chargeB (int): the charge of the second molecule.
multiplicityA (int): the multiplicity for unpaired electrons.
multiplicityB (int): the multiplicity for unpaired electrons.
Returns:
(psi4.core.Molecule): the psi4 geometry.
"""
from BigDFT.IO import write_xyz
from psi4 import geometry
# py2 workaround
from sys import version_info
if version_info[0] < 3:
from io import BytesIO as StringIO
else:
try:
from io import StringIO
except ImportError:
from StringIO import StringIO
# Figure out the multiplicity if not specified
if multiplicityA is None:
nel = 0
for frag in sysA.values():
nel += sum([x.atomic_number for x in frag])
nel += chargeA
if nel % 2 != 0:
multiplicityA = 2
else:
multiplicityA = 1
pass
if sysB is not None and multiplicityB is None:
nel = 0
for frag in sysB.values():
nel += sum([x.atomic_number for x in frag])
nel += chargeB
if nel % 2 != 0:
multiplicityB = 2
else:
multiplicityB = 1
# Create the right string representations
strA = StringIO()
write_xyz(sysA, strA)
xyzA = str(chargeA) + " " + str(multiplicityA) + "\n"
xyzA += "\n".join(strA.getvalue().split("\n")[2:])
if sysB is not None:
strB = StringIO()
write_xyz(sysB, strB)
xyzB = str(chargeB) + " " + str(multiplicityB) + "\n"
xyzB += "\n".join(strB.getvalue().split("\n")[2:])
if sysB is None:
return geometry("\n"+xyzA+"\nunits angstrom\n")
else:
return geometry("\n"+xyzA+"--\n"+xyzB+"units angstrom\n")
[docs]class PSI4Calculator(Runner):
"""
Perform a calculation on a given system using the PSI4 code.
Note that if you intend to use an SAPT method, you need to pass a
system which is composed of exactly two fragments.
PSI4 has a number of different actions, ab initio methods, and basis
sets. Be sure to specify each of these to the run command.
"""
import os
def __init__(self, omp=os.environ.get('OMP_NUM_THREADS', '1'),
skip=False, verbose=True):
# Use the initialization from the Runner class (so all options inside
# __global_options)
self.mol = None
Runner.__init__(self, omp=str(omp), dry_run=False, skip=skip,
verbose=verbose)
[docs] def pre_processing(self):
"""
Process local run dictionary to create the input directory and identify
the command to be passed
Returns:
:py:class:`dict`: dictionary containing the command to be passed to
:meth:`process_run`
"""
from psi4 import set_options, energy, optimize, frequency
from psi4.core import set_output_file
from BigDFT.Systems import System
from os.path import join
# Check Arguments
try:
self.sys = self.run_options["sys"]
except KeyError:
raise ValueError("sys= must be provided as an argument")
try:
action = self.run_options["action"]
except KeyError:
raise ValueError("You must specify a valid action=(energy,"
" optimize, frequency")
try:
method = self.run_options["method"]
except KeyError:
raise ValueError("You must specify an ab initio method=")
try:
basis = self.run_options["basis"]
except KeyError:
raise ValueError("You must specify a basis=")
chargeA = self.run_options.get("chargeA", 0)
chargeB = self.run_options.get("chargeB", 0)
multiplicityA = self.run_options.get("multiplicityA", None)
multiplicityB = self.run_options.get("multiplicityB", None)
# Run directory
self._ensure_run_directory()
# Check skip
if self.run_options["skip"]:
try:
log = PSI4Logfile(self.sys, self._get_logname(True),
action, method)
return {"command": None}
except ValueError: # invalid logfile, so we can't skip.
pass
except IOError: # no logfile, so we can't skip.
pass
# Convert system
if "sapt" in method:
keys = list(self.sys)
if len(keys) != 2:
raise ValueError("For SAPT method, your system must be"
" composed of exactly two fragments.")
sysA = System()
sysA[keys[0]] = self.sys[keys[0]]
sysB = System()
sysB[keys[1]] = self.sys[keys[1]]
mol = bigdft_to_psi4(sysA=sysA, sysB=sysB,
chargeA=chargeA, chargeB=chargeB,
multiplicityA=multiplicityA,
multiplicityB=multiplicityB)
else:
mol = bigdft_to_psi4(sysA=self.sys, chargeA=chargeA,
multiplicityA=multiplicityA)
# Set extra options.
if "psi4_options" in self.run_options:
set_options(self.run_options["psi4_options"])
set_output_file(self._get_logname(True), False)
# Instead of a command, we return a closure.
if action == "energy":
def cmd(method, mol):
energy(method, molecule=mol)
elif action == "optimize":
def cmd(method, mol):
optimize(method, molecule=mol)
elif action == "frequency":
def cmd(method, mol):
frequency(method, molecule=mol)
return {"command": cmd(join(method, basis), mol)}
[docs] def process_run(self, command):
"""
Run the psi4 command.
"""
from os import environ, system
# Set the number of omp threads only if the variable is not present
# in the environment
if 'OMP_NUM_THREADS' not in environ:
environ['OMP_NUM_THREADS'] = self.run_options['omp']
if self.run_options['verbose']:
if self.run_dir != '.':
safe_print('Run directory', self.run_dir)
# Run the command
if command is not None:
command()
return {'logname': self._get_logname(True)}
[docs] def post_processing(self, logname, command):
"""
Post processing the calculation.
Returns:
(BigDFT.Interop.PSI4Interop.PSI4Logfile): a representation of the
logfile.
"""
try:
return PSI4Logfile(self.sys, fname=logname,
action=self.run_options["action"],
method=self.run_options["method"])
except IOError:
raise ValueError("The logfile (", logname, ") does not exist.")
def _ensure_run_directory(self):
from futile.Utils import ensure_dir
run_dir = self.run_options.get('run_dir', '.')
# Create the run_dir if not exist
if ensure_dir(run_dir) and self.run_options['verbose']:
safe_print("Create the sub-directory '%s'" % run_dir)
self.run_dir = run_dir
def _get_logname(self, full):
from os.path import join
name = self.run_options.get("name", "sapt") + ".log"
if full:
run_dir = self.run_options.get('run_dir', '.')
return join(run_dir, name)
else:
return name
[docs]class PSI4Logfile(dict):
"""
A logfile wrapper for an SAPT calculation.
This logfile inherits from a dictionary, as the properties we get out
of a calculation depend on the type of calculation performed.
Args:
fname (str): the name of the file to process.
action (str): the action that was used when running SAPT (energy,
optimize, frequency).
method (str): the scf method used.
"""
def __init__(self, sys, fname, action, method):
dict.__init__({})
self["energy"] = {}
self.sys = sys
# The logfile, broken up into a list of lines.
self.log = self._log_to_str(fname)
if "sapt" in method:
self._extract_sapt_energies(method)
elif "energy" in action:
self._extract_total_energy()
elif "optimize" in action:
self._extract_optimization_energies()
self._extract_optimization_geometries()
def _log_to_str(self, fname):
with open(fname) as ifile:
lstr = "".join([x for x in ifile])
return lstr.split("\n")
def _look_for(self, goal):
for offset, line in enumerate(self.log):
if goal in line:
break
if offset > len(self.log):
raise ValueError("Couldn't find property in logfile: ", goal)
return offset
def _extract_optimization_energies(self):
self["max force"] = []
self["energy"]["optimize"] = []
offset = self._look_for("==> Optimization Summary <==")
for line in self.log[offset+6:]:
if "---" in line:
break
split = line.split()
self["max force"].append(float(split[3]))
self["energy"]["optimize"].append(float(split[1]))
self["energy"]["total"] = self["energy"]["optimize"][-1]
def _extract_optimization_geometries(self):
from BigDFT.Systems import System
from BigDFT.Fragments import RotoTranslation
from copy import deepcopy
# Read out the posinps
posinps = []
geomcount = 0
for i in range(len(self.log)):
line = self.log[i]
if "==> Geometry <==" in line:
# It is printed twice, once for scf and one for gradient.
geomcount += 1
if geomcount % 2 == 0:
continue
# Extract
geom = {"positions": [], "units": "angstroem"}
j = 9
while(True):
split = self.log[i + j].split()
if len(split) == 0:
break
atom = {split[0].title(): [float(x) for x in split[1:4]]}
geom["positions"].append(atom)
j += 1
posinps.append(geom)
# Convert the posinps into first class system objects.
self["positions"] = []
for pos in posinps:
self["positions"].append(deepcopy(self.sys))
self["positions"][-1].update_positions_from_dict(pos)
# PSI4 does rototranslations of the geometry, so we need to undo it.
rt = {}
for fragid in self.sys:
rt[fragid] = RotoTranslation(self["positions"][0][fragid],
self.sys[fragid])
for osys in self["positions"]:
for fragid in self.sys:
osys[fragid] = rt[fragid].dot(osys[fragid])
def _extract_total_energy(self):
for line in self.log:
if "Total Energy =" in line:
self["energy"]["total"] = float(line.split()[-1])
if "total" not in self["energy"]:
raise ValueError("Could you find total energy in log file.")
def _extract_sapt_energies(self, method):
def _check_vals():
if any([x not in vals for x in ["total", "exchange", "induction",
"dispersion", "electrostatics"]]):
raise ValueError("Could not find sapt energies in log file")
# Get main computation
vals = {}
offset = self._look_for("SAPT Results")
for line in self.log[offset:]:
if "Electrostatics" in line:
vals["electrostatics"] = float(line.split()[1]) / 1000.0
elif "Exchange" in line:
vals["exchange"] = float(line.split()[1]) / 1000.0
elif "Induction" in line:
vals["induction"] = float(line.split()[1]) / 1000.0
elif "Dispersion" in line:
vals["dispersion"] = float(line.split()[1]) / 1000.0
elif "Total" in line:
vals["total"] = float(line.split()[2]) / 1000.0
break
_check_vals()
self["energy"][method] = vals
[docs]def _example():
"""Example of using PSI4 interoperability"""
from BigDFT.IO import XYZReader
from BigDFT.Systems import System
from BigDFT.Fragments import Fragment
from os.path import join
from copy import deepcopy
# Create a system.
reader = XYZReader(join("Database", "XYZs", "He.xyz"))
fsys = System()
fsys["FRA:1"] = Fragment(xyzfile=reader)
fsys["FRA:2"] = deepcopy(fsys["FRA:1"])
fsys["FRA:2"].translate([-2, 0, 0])
# Create a calculator.
code = PSI4Calculator()
log = code.run(sys=fsys, action="energy", basis="jun-cc-pvdz",
psi4_options={"scf_type": "direct"}, method="scf",
name="test")
print(log)
# The raw logfile is also available.
print(log.log[4])
# Geometry optimization.
log = code.run(sys=fsys, action="optimize", basis="jun-cc-pvdz",
method="scf", name="test-opt")
print(log)
# SAPT
log = code.run(sys=fsys, action="energy", basis="jun-cc-pvdz",
method="sapt0", name="test-sapt")
print(log)
if __name__ == "__main__":
_example()